home *** CD-ROM | disk | FTP | other *** search
/ PC World Komputer 2010 April / PCWorld0410.iso / hity wydania / Ubuntu 9.10 PL / karmelkowy-koliberek-desktop-9.10-i386-PL.iso / casper / filesystem.squashfs / usr / lib / python2.6 / multiprocessing / connection.py < prev    next >
Text File  |  2009-11-02  |  12KB  |  418 lines

  1. #
  2. # A higher level module for using sockets (or Windows named pipes)
  3. #
  4. # multiprocessing/connection.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
  7. #
  8.  
  9. __all__ = [ 'Client', 'Listener', 'Pipe' ]
  10.  
  11. import os
  12. import sys
  13. import socket
  14. import errno
  15. import time
  16. import tempfile
  17. import itertools
  18.  
  19. import _multiprocessing
  20. from multiprocessing import current_process, AuthenticationError
  21. from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
  22. from multiprocessing.forking import duplicate, close
  23.  
  24.  
  25. #
  26. #
  27. #
  28.  
  29. BUFSIZE = 8192
  30.  
  31. _mmap_counter = itertools.count()
  32.  
  33. default_family = 'AF_INET'
  34. families = ['AF_INET']
  35.  
  36. if hasattr(socket, 'AF_UNIX'):
  37.     default_family = 'AF_UNIX'
  38.     families += ['AF_UNIX']
  39.  
  40. if sys.platform == 'win32':
  41.     default_family = 'AF_PIPE'
  42.     families += ['AF_PIPE']
  43.  
  44. #
  45. #
  46. #
  47.  
  48. def arbitrary_address(family):
  49.     '''
  50.     Return an arbitrary free address for the given family
  51.     '''
  52.     if family == 'AF_INET':
  53.         return ('localhost', 0)
  54.     elif family == 'AF_UNIX':
  55.         return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
  56.     elif family == 'AF_PIPE':
  57.         return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
  58.                                (os.getpid(), _mmap_counter.next()))
  59.     else:
  60.         raise ValueError('unrecognized family')
  61.  
  62.  
  63. def address_type(address):
  64.     '''
  65.     Return the types of the address
  66.  
  67.     This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
  68.     '''
  69.     if type(address) == tuple:
  70.         return 'AF_INET'
  71.     elif type(address) is str and address.startswith('\\\\'):
  72.         return 'AF_PIPE'
  73.     elif type(address) is str:
  74.         return 'AF_UNIX'
  75.     else:
  76.         raise ValueError('address type of %r unrecognized' % address)
  77.  
  78. #
  79. # Public functions
  80. #
  81.  
  82. class Listener(object):
  83.     '''
  84.     Returns a listener object.
  85.  
  86.     This is a wrapper for a bound socket which is 'listening' for
  87.     connections, or for a Windows named pipe.
  88.     '''
  89.     def __init__(self, address=None, family=None, backlog=1, authkey=None):
  90.         family = family or (address and address_type(address)) \
  91.                  or default_family
  92.         address = address or arbitrary_address(family)
  93.  
  94.         if family == 'AF_PIPE':
  95.             self._listener = PipeListener(address, backlog)
  96.         else:
  97.             self._listener = SocketListener(address, family, backlog)
  98.  
  99.         if authkey is not None and not isinstance(authkey, bytes):
  100.             raise TypeError, 'authkey should be a byte string'
  101.  
  102.         self._authkey = authkey
  103.  
  104.     def accept(self):
  105.         '''
  106.         Accept a connection on the bound socket or named pipe of `self`.
  107.  
  108.         Returns a `Connection` object.
  109.         '''
  110.         c = self._listener.accept()
  111.         if self._authkey:
  112.             deliver_challenge(c, self._authkey)
  113.             answer_challenge(c, self._authkey)
  114.         return c
  115.  
  116.     def close(self):
  117.         '''
  118.         Close the bound socket or named pipe of `self`.
  119.         '''
  120.         return self._listener.close()
  121.  
  122.     address = property(lambda self: self._listener._address)
  123.     last_accepted = property(lambda self: self._listener._last_accepted)
  124.  
  125.  
  126. def Client(address, family=None, authkey=None):
  127.     '''
  128.     Returns a connection to the address of a `Listener`
  129.     '''
  130.     family = family or address_type(address)
  131.     if family == 'AF_PIPE':
  132.         c = PipeClient(address)
  133.     else:
  134.         c = SocketClient(address)
  135.  
  136.     if authkey is not None and not isinstance(authkey, bytes):
  137.         raise TypeError, 'authkey should be a byte string'
  138.  
  139.     if authkey is not None:
  140.         answer_challenge(c, authkey)
  141.         deliver_challenge(c, authkey)
  142.  
  143.     return c
  144.  
  145.  
  146. if sys.platform != 'win32':
  147.  
  148.     def Pipe(duplex=True):
  149.         '''
  150.         Returns pair of connection objects at either end of a pipe
  151.         '''
  152.         if duplex:
  153.             s1, s2 = socket.socketpair()
  154.             c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
  155.             c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
  156.             s1.close()
  157.             s2.close()
  158.         else:
  159.             fd1, fd2 = os.pipe()
  160.             c1 = _multiprocessing.Connection(fd1, writable=False)
  161.             c2 = _multiprocessing.Connection(fd2, readable=False)
  162.  
  163.         return c1, c2
  164.  
  165. else:
  166.  
  167.     from ._multiprocessing import win32
  168.  
  169.     def Pipe(duplex=True):
  170.         '''
  171.         Returns pair of connection objects at either end of a pipe
  172.         '''
  173.         address = arbitrary_address('AF_PIPE')
  174.         if duplex:
  175.             openmode = win32.PIPE_ACCESS_DUPLEX
  176.             access = win32.GENERIC_READ | win32.GENERIC_WRITE
  177.             obsize, ibsize = BUFSIZE, BUFSIZE
  178.         else:
  179.             openmode = win32.PIPE_ACCESS_INBOUND
  180.             access = win32.GENERIC_WRITE
  181.             obsize, ibsize = 0, BUFSIZE
  182.  
  183.         h1 = win32.CreateNamedPipe(
  184.             address, openmode,
  185.             win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
  186.             win32.PIPE_WAIT,
  187.             1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
  188.             )
  189.         h2 = win32.CreateFile(
  190.             address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
  191.             )
  192.         win32.SetNamedPipeHandleState(
  193.             h2, win32.PIPE_READMODE_MESSAGE, None, None
  194.             )
  195.  
  196.         try:
  197.             win32.ConnectNamedPipe(h1, win32.NULL)
  198.         except WindowsError, e:
  199.             if e.args[0] != win32.ERROR_PIPE_CONNECTED:
  200.                 raise
  201.  
  202.         c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
  203.         c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
  204.  
  205.         return c1, c2
  206.  
  207. #
  208. # Definitions for connections based on sockets
  209. #
  210.  
  211. class SocketListener(object):
  212.     '''
  213.     Representation of a socket which is bound to an address and listening
  214.     '''
  215.     def __init__(self, address, family, backlog=1):
  216.         self._socket = socket.socket(getattr(socket, family))
  217.         self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  218.         self._socket.bind(address)
  219.         self._socket.listen(backlog)
  220.         self._address = self._socket.getsockname()
  221.         self._family = family
  222.         self._last_accepted = None
  223.  
  224.         if family == 'AF_UNIX':
  225.             self._unlink = Finalize(
  226.                 self, os.unlink, args=(address,), exitpriority=0
  227.                 )
  228.         else:
  229.             self._unlink = None
  230.  
  231.     def accept(self):
  232.         s, self._last_accepted = self._socket.accept()
  233.         fd = duplicate(s.fileno())
  234.         conn = _multiprocessing.Connection(fd)
  235.         s.close()
  236.         return conn
  237.  
  238.     def close(self):
  239.         self._socket.close()
  240.         if self._unlink is not None:
  241.             self._unlink()
  242.  
  243.  
  244. def SocketClient(address):
  245.     '''
  246.     Return a connection object connected to the socket given by `address`
  247.     '''
  248.     family = address_type(address)
  249.     s = socket.socket( getattr(socket, family) )
  250.  
  251.     while 1:
  252.         try:
  253.             s.connect(address)
  254.         except socket.error, e:
  255.             if e.args[0] != errno.ECONNREFUSED: # connection refused
  256.                 debug('failed to connect to address %s', address)
  257.                 raise
  258.             time.sleep(0.01)
  259.         else:
  260.             break
  261.     else:
  262.         raise
  263.  
  264.     fd = duplicate(s.fileno())
  265.     conn = _multiprocessing.Connection(fd)
  266.     s.close()
  267.     return conn
  268.  
  269. #
  270. # Definitions for connections based on named pipes
  271. #
  272.  
  273. if sys.platform == 'win32':
  274.  
  275.     class PipeListener(object):
  276.         '''
  277.         Representation of a named pipe
  278.         '''
  279.         def __init__(self, address, backlog=None):
  280.             self._address = address
  281.             handle = win32.CreateNamedPipe(
  282.                 address, win32.PIPE_ACCESS_DUPLEX,
  283.                 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
  284.                 win32.PIPE_WAIT,
  285.                 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
  286.                 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
  287.                 )
  288.             self._handle_queue = [handle]
  289.             self._last_accepted = None
  290.  
  291.             sub_debug('listener created with address=%r', self._address)
  292.  
  293.             self.close = Finalize(
  294.                 self, PipeListener._finalize_pipe_listener,
  295.                 args=(self._handle_queue, self._address), exitpriority=0
  296.                 )
  297.  
  298.         def accept(self):
  299.             newhandle = win32.CreateNamedPipe(
  300.                 self._address, win32.PIPE_ACCESS_DUPLEX,
  301.                 win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
  302.                 win32.PIPE_WAIT,
  303.                 win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
  304.                 win32.NMPWAIT_WAIT_FOREVER, win32.NULL
  305.                 )
  306.             self._handle_queue.append(newhandle)
  307.             handle = self._handle_queue.pop(0)
  308.             try:
  309.                 win32.ConnectNamedPipe(handle, win32.NULL)
  310.             except WindowsError, e:
  311.                 if e.args[0] != win32.ERROR_PIPE_CONNECTED:
  312.                     raise
  313.             return _multiprocessing.PipeConnection(handle)
  314.  
  315.         @staticmethod
  316.         def _finalize_pipe_listener(queue, address):
  317.             sub_debug('closing listener with address=%r', address)
  318.             for handle in queue:
  319.                 close(handle)
  320.  
  321.     def PipeClient(address):
  322.         '''
  323.         Return a connection object connected to the pipe given by `address`
  324.         '''
  325.         while 1:
  326.             try:
  327.                 win32.WaitNamedPipe(address, 1000)
  328.                 h = win32.CreateFile(
  329.                     address, win32.GENERIC_READ | win32.GENERIC_WRITE,
  330.                     0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
  331.                     )
  332.             except WindowsError, e:
  333.                 if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
  334.                                      win32.ERROR_PIPE_BUSY):
  335.                     raise
  336.             else:
  337.                 break
  338.         else:
  339.             raise
  340.  
  341.         win32.SetNamedPipeHandleState(
  342.             h, win32.PIPE_READMODE_MESSAGE, None, None
  343.             )
  344.         return _multiprocessing.PipeConnection(h)
  345.  
  346. #
  347. # Authentication stuff
  348. #
  349.  
  350. MESSAGE_LENGTH = 20
  351.  
  352. CHALLENGE = b'#CHALLENGE#'
  353. WELCOME = b'#WELCOME#'
  354. FAILURE = b'#FAILURE#'
  355.  
  356. def deliver_challenge(connection, authkey):
  357.     import hmac
  358.     assert isinstance(authkey, bytes)
  359.     message = os.urandom(MESSAGE_LENGTH)
  360.     connection.send_bytes(CHALLENGE + message)
  361.     digest = hmac.new(authkey, message).digest()
  362.     response = connection.recv_bytes(256)        # reject large message
  363.     if response == digest:
  364.         connection.send_bytes(WELCOME)
  365.     else:
  366.         connection.send_bytes(FAILURE)
  367.         raise AuthenticationError('digest received was wrong')
  368.  
  369. def answer_challenge(connection, authkey):
  370.     import hmac
  371.     assert isinstance(authkey, bytes)
  372.     message = connection.recv_bytes(256)         # reject large message
  373.     assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
  374.     message = message[len(CHALLENGE):]
  375.     digest = hmac.new(authkey, message).digest()
  376.     connection.send_bytes(digest)
  377.     response = connection.recv_bytes(256)        # reject large message
  378.     if response != WELCOME:
  379.         raise AuthenticationError('digest sent was rejected')
  380.  
  381. #
  382. # Support for using xmlrpclib for serialization
  383. #
  384.  
  385. class ConnectionWrapper(object):
  386.     def __init__(self, conn, dumps, loads):
  387.         self._conn = conn
  388.         self._dumps = dumps
  389.         self._loads = loads
  390.         for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
  391.             obj = getattr(conn, attr)
  392.             setattr(self, attr, obj)
  393.     def send(self, obj):
  394.         s = self._dumps(obj)
  395.         self._conn.send_bytes(s)
  396.     def recv(self):
  397.         s = self._conn.recv_bytes()
  398.         return self._loads(s)
  399.  
  400. def _xml_dumps(obj):
  401.     return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
  402.  
  403. def _xml_loads(s):
  404.     (obj,), method = xmlrpclib.loads(s.decode('utf8'))
  405.     return obj
  406.  
  407. class XmlListener(Listener):
  408.     def accept(self):
  409.         global xmlrpclib
  410.         import xmlrpclib
  411.         obj = Listener.accept(self)
  412.         return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
  413.  
  414. def XmlClient(*args, **kwds):
  415.     global xmlrpclib
  416.     import xmlrpclib
  417.     return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
  418.